require(ggplot2)
require(plyr)
require(grid) # contains the arrow function
require(biOps)
require(doMC) # for parallel code
require(EBImage)
## To install EBImage
# source("http://bioconductor.org/biocLite.R")
# biocLite("EBImage")

# start parallel environment
registerDoMC()
# functions for converting images back and forth
im.to.df<-function(in.img) {
    out.im<-expand.grid(x=1:nrow(in.img),y=1:ncol(in.img))
    out.im$val<-as.vector(in.img)
    out.im
}
df.to.im<-function(in.df,val.col="val",inv=F) {
  in.vals<-in.df[[val.col]]
  if(class(in.vals[1])=="logical") in.vals<-as.integer(in.vals*255)
  if(inv) in.vals<-255-in.vals
  out.mat<-matrix(in.vals,nrow=length(unique(in.df$x)),byrow=F)
  attr(out.mat,"type")<-"grey"
  out.mat
}
ddply.cutcols<-function(...,cols=1) {
  # run standard ddply command
  cur.table<-ddply(...)
  cutlabel.fixer<-function(oVal) {
    sapply(oVal,function(x) {
      cnv<-as.character(x)
      mean(as.numeric(strsplit(substr(cnv,2,nchar(cnv)-1),",")[[1]]))
    })
  }
  cutname.fixer<-function(c.str) {
    s.str<-strsplit(c.str,"(",fixed=T)[[1]]
    t.str<-strsplit(paste(s.str[c(2:length(s.str))],collapse="("),",")[[1]]
    paste(t.str[c(1:length(t.str)-1)],collapse=",")
  }
  for(i in c(1:cols)) {
    cur.table[,i]<-cutlabel.fixer(cur.table[,i])
    names(cur.table)[i]<-cutname.fixer(names(cur.table)[i])
  }
  cur.table
}

Making Scaling Simple: Using Big Data to Improve Imaging

author: Kevin Mader (slides: http://bit.ly/1xpbUSf) date: January 2015, Swiss Big Data User Group Meeting width: 1440 height: 900 transition: rotate css: beigeplus.css navigation: slide

SIL

4Quant Paul Scherrer Institut ETH Zurich

Outline

The Tools


The Science

Internal Structures

Synchrotron-based X-Ray Tomographic Microscopy

The only technique which can do all

SLS and TOMCAT

[1] Mokso et al., J. Phys. D, 46(49),2013


Courtesy of M. Pistone at U. Bristol

Microscopy in 2014

Ever more sensitive and faster hardware means image acquisition possibilities are growing rapidly

X-Ray

Optical


Images are only as useful as what you can do with them, the bottleneck isn't measurement speed, but analysis

Time Consumption

Adapted from: Sboner A,et. al. Genome Biology, 2011

The changing world

Moore's law is still alive, number of transitors are doubling every 18 months. Traditional solution: Bigger problem means a more expensive computer

Moore's Law


But, not how it used to be

Clock cycles per second (speed) and performance per clock cycle has saturated, so parallelization is needed for further benefits.

Processor Growth

The Problem

There is a flood of new data

What took an entire PhD 3-4 years ago, can now be measured in a weekend, or even several seconds. Analysis tools have not kept up, are difficult to customize, and usually highly specific.

Optimized Data-Structures do not fit

Data-structures that were fast and efficient for computers with 640kb of memory do not make sense anymore

Single-core computing is too slow

CPU's are not getting that much faster but there are a lot more of them. Iterating through a huge array takes almost as long on 2014 hardware as 2006 hardware

The reality

X-Ray Microscopy isn't the first

Google, Facebook, Yahoo, and Amazon had these, or very similar problems years ago. They hired a lot of very competent Computer Science PhDs to solve it.

Cloud computing is ubiquitious and cheap

Cloud computing infrastructures are 10-100X cheaper than labor costs, usually more reliable internal systems, and easily accessible from almost anywhere.


The big why!

What we want?

Fast, scalable computing

Run it on your laptop, local cluster, or 5000 machines at Amazon without rewriting anything

Reliable

Easy

Big Data: Standard Definition

Velocity, Volume, Variety

When a ton of heterogeneous data is coming in fast.

Performant, scalable, and flexible

Save Everything

Tons of Hype!

What is big data really?


Big Data: A brief introduction

Google ran head-first into Big Data trying to create an index of the internet. Single computers cannot do everything and writing parallel and distributed code is complicated, difficult to test, easy to break, and often non-deterministic. The resulting code is a tangled inseperable mess of computation, data, and processing management.

MapReduce

A robust, fault-tolerant framework for handling distributed processing and data storage on which complicated analyses could be run declaritively by specifying what not how. Reduced data processing vocabulary to two words Map and Reduce. Recreated and open-sourced by Yahoo as Hadoop.


Apache Spark

MapReduce can be applied to many tasks, but it is often very tricky and time-consuming to describe complicated, interative workflows into just map and reduce. Spark expanded the vocabulary of Hadoop to filter, flatMap, aggregate, join, groupBy, and fold, incorporated many performance improvements through caching (very important for images) and interactive (REPL) shell

The Framework First


Apache Spark and Hadoop 2

The two frameworks provide a free out of the box solution for

Spark -> Microscopy?

These frameworks are really cool and Spark has a big vocabulary, but flatMap, filter, aggregate, join, groupBy, and fold still do not sound like anything I want to do to an image.

I want to


Spark Image Layer

SIL

Spark Image Layer

We have developed a number of commands for SIL handling standard image processing tasks

SIL Commands


New components can be added using Spark Languages or imported from ImageJ directly. The resulting analyses are then parallelized by the Spark Engine and run on multiple cores/CPUs locally, on a cluster, supercomputer, or even a virtual in the cluster in the cloud which can be started in seconds.

SIL Commands

How does scaling look?

Simple Analysis


Complicated Analysis

Fault-Tolerance

Fault Tolerance


Fault-tolerance is particularly tedious to add into existing tools, it must be done from the first step. Spark Imaging Layer is fault-toleranct and allows checkpointing with single result.checkpoint() commands enabling long running analyses to continue

Flexibility through Types

Developing in Scala brings additional flexibility through types[1], with microscopy the standard formats are 2-, 3- and even 4- or more dimensional arrays or matrices which can be iterated through quickly using CPU and GPU code. While still possible in Scala, there is a great deal more flexibility for data types allowing anything to be stored as an image and then processed as long as basic functions make sense.

[1] Fighting Bit Rot with Types (Experience Report: Scala Collections), M Odersky, FSTTCS 2009, December 2009


What is an image?

A collection of positions and values, maybe more (not an array of double). Arrays are efficient for storing in computer memory, but often a poor way of expressing scientific ideas and analyses.

combine information from nearby pixels

determine groups of pixels which are very similar to desired result

Practical Meaning

Much simplier code and combining different pieces of information is trivial. For example calculating the distance to the nearest vessel and then determining the average distance of each cell

cellDist = vesselImage.invert.
  calculateDistance
cellLabel = cellThreshold.makeLabel
avgCellDist = cellLabel.join(cellDist).
  groupBy(LABEL).reduce(MEAN)

This analysis can be done easily since the types are flexible


Cool Picture

m.out<-rbind(data.frame(type="healthy",dist=rnorm(50,100,10)),data.frame(type="sick",dist=rnorm(200,100,20)))
ggplot(m.out)+geom_density(aes(x=dist,color=type))+
  labs(x="Cell to Canal Surface",color="Health")+theme_bw(20)
plot of chunk unnamed-chunk-1

How does this produce better science?

We want to understand the relationship between genetic background and bone structure


Build up of a study

How does this produce better science?

Genetic studies require hundreds to thousands of samples

Genetic LOD Scores


Standard approach (2008)

Today's approach (2014)

Genetic Studies using Big Data

results.df<-data.frame(Task=c("Load and Preprocess","Single Column Average","1 K-means Iteration"),
                       Single.Time=c("360 minutes","4.6s","2 minutes"),
                       Spark.Time=c("10 minutes","400ms","1s"))
names(results.df)[2]<-"Single Core Time"
names(results.df)[3]<-"Spark Time (40 cores)"
kable(results.df)
Task Single Core Time Spark Time (40 cores)
Load and Preprocess 360 minutes 10 minutes
Single Column Average 4.6s 400ms
1 K-means Iteration 2 minutes 1s

The science beyond one study

A framework is only as useful as the scientific studies it enables. Here we show the new types of science that are possible using the Spark Imaging Layer. In particular


Our Large Scale Imaging Projects

Zebra fish Phenotyping Project

Collaboration with Keith Cheng, Ian Foster, Xuying Xin, Patrick La Raviere, Steve Wang

1000s of samples of full adult animals, imaged at 0.74 \(\mu m\) resolution: Images 11500 x 2800 x 628 \(\longrightarrow\) 20-40GVx / sample


Brain Project

Collaboration with Alberto Astolfo, Matthias Schneider, Bruno Weber, Marco Stampanoni

1 \(cm^3\) scanned at 1 \(\mu m\) resolution: Images \(\longrightarrow\) 1000 GVx / sample

Imaging as Machine Learning

Scalable high-throughput imaging enables new realms of investigation and analysis


ImageBox

library(ggplot2)
library(reshape2)
parm.table<-expand.grid(Temperature=seq(20,80,length.out=20),Gene=c(1,2,3),pH=seq(3,6,length.out=10))
rf<-function(amplitude=0.1) runif(nrow(parm.table),min=1/(1+amplitude),max=1+amplitude)
parm.table$Thickness<-with(parm.table,10^(pH+as.numeric(Gene))-10*Temperature)*rf(.1)
parm.table$Count<-with(parm.table,10^(as.numeric(Gene))+100*Temperature)*rf(0.01)
parm.table$Connections<-with(parm.table,2^(as.numeric(Gene))-5*as.numeric(Gene)+12)*rf(0.2)
parm.table$GrowthRate<-with(parm.table,log(Thickness/Count)*Gene)*rf(0.2)
parm.table$Strength<-with(parm.table,Count/Connections)*rf(0.1)
parm.table$Viability<-with(parm.table,10^(10-Connections))*rf(0.2)

parm.melt<-melt(parm.table,measure.vars = c("Temperature","Gene","pH"),variable.name="Controlled",value.name="CValue")
strt.melt<-melt(parm.melt,measure.vars = c("Thickness","Count","Connections"),variable.name="Measured",value.name="MValue")
all.melt<-melt(strt.melt,measure.vars = c("GrowthRate","Strength","Viability"),variable.name="Desired",value.name="DValue")

ggplot(all.melt,aes(x=CValue,y=MValue))+
  geom_jitter()+
  facet_grid(Measured~Controlled,scales="free")+
  labs(x="Controlled Value",y="Measured Value")+
  theme_bw(25)
plot of chunk unnamed-chunk-3

Real-time with Spark Streaming

Before


With Spark

library(igraph)
node.names<-c("Projection\nImages","Most Recent\n1500 images","ROI\nTracking","Exposure\nAdaption",
              "Flat-field\nCorrection","Sinogram\nGeneration","GridRec","Filtering",
              "Segmentation","Histogram","Center\nEstimation","Shape\nAnalysis")
c.mat<-matrix(0,length(node.names),length(node.names))
colnames(c.mat)<-node.names
rownames(c.mat)<-node.names
c.mat["Projection\nImages","Flat-field\nCorrection"]<-1
c.mat["Projection\nImages","Exposure\nAdaption"]<-1
c.mat["Flat-field\nCorrection","Most Recent\n1500 images"]<-1
c.mat["Most Recent\n1500 images","Sinogram\nGeneration"]<-1
c.mat["Flat-field\nCorrection","ROI\nTracking"]<-1
c.mat["Sinogram\nGeneration","GridRec"]<-1
c.mat["Sinogram\nGeneration","Center\nEstimation"]<-1
c.mat["Center\nEstimation","GridRec"]<-1
c.mat["GridRec","Filtering"]<-1
c.mat["Filtering","Histogram"]<-1
c.mat["Filtering","Segmentation"]<-1
c.mat["Segmentation","Shape\nAnalysis"]<-1
g<-graph.adjacency(c.mat,mode="directed")
V(g)$degree <- degree(g)
V(g)$label <- V(g)$name
V(g)$color <- "lightblue"
V(g)["Projection\nImages"]$color<-"red"
V(g)["Flat-field\nCorrection"]$color<-"green"
V(g)["Exposure\nAdaption"]$color<-"green"
V(g)["Sinogram\nGeneration"]$color<-"green"
V(g)["ROI\nTracking"]$color<-"green"
V(g)["Center\nEstimation"]$color<-"green"
V(g)["GridRec"]$color<-"green"
V(g)["Most Recent\n1500 images"]$color<-"green"
V(g)["Most Recent\n1500 images"]$frame.width<-2
V(g)$size<-35
E(g)$width<-2
E(g)$color<-"black"
E(g)[8]$width<-5
E(g)[8]$color<-"red"
plot(g,  layout=layout.fruchterman.reingold)# layout.kamada.kawai) #layout=layout.circle)#
plot of chunk unnamed-chunk-4

Real-time with Spark Streaming: Webcam

val wr = new WebcamReceiver(StorageLevel.MEMORY_ONLY,wrDelay,wrThreads)
val ssc = sc.toStreaming(strTime)
val imgList = ssc.receiverStream(wr)

Filter images

val filtImgs = allImgs.mapValues(_.run("Median...","radius=3"))

Identify Outliers in Streams

val eventImages = filtImgs.
    transform{
    inImages =>
      val totImgs = inImages.count()
      val bgImage = inImages.values.reduce(_.average(_,1.0f)).multiply(1.0/totImgs)
      val corImage = inImages.map {
        case (inTime,inImage) =>
          val corImage = inImage.subtract(bgImage)
          (corImage.getImageStatistics().mean,(inTime,corImage))
      }
      corImage
  }

Beyond Image Processing

For many datasets processing, segmentation, and morphological analysis is all the information needed to be extracted. For many systems like bone tissue, cellular tissues, cellular materials and many others, the structure is just the beginning and the most interesting results come from the application to physical, chemical, or biological rules inside of these structures.

\[ \sum_j \vec{F}_{ij} = m\ddot{x}_i \]


Such systems can be easily represented by a graph, and analyzed using GraphX in a distributed, fault tolerant manner.

library(igraph)
xsize<-4
ysize<-xsize
grid.df<-expand.grid(x=1:xsize,y=1:ysize)
node.names<-dlply(grid.df,.(x,y),function(w) paste(w$x,w$y,sep=","))
c.mat<-matrix(0,length(node.names),length(node.names))
colnames(c.mat)<-node.names
rownames(c.mat)<-node.names
for(c.index in 1:length(node.names)) {
  ke<-function(i) {
    print(c(c.index,i))
    out<-F
    if(i>=1 & i<=length(node.names)) {
      out<-T
      if(c.index%%xsize==0 & i-c.index==1) out<-F
      if(c.index%%xsize==1 & i-c.index==-1) out<-F
    } 
    out
  }
  for(d.index in (c.index+c(-1,1,-xsize,xsize))) {
    if(ke(d.index)) c.mat[d.index,c.index]<-1
  }
}
g<-graph.adjacency(c.mat,mode="directed")
V(g)$degree <- degree(g)
V(g)$label <- V(g)$name
V(g)$color <- "lightblue"

V(g)$size<-35
E(g)$width<-4
E(g)$color<-"black"

plot(g,  layout=as.matrix(grid.df),vertex.shape="csquare")# layout.kamada.kawai) #layout=layout.circle)#
plot of chunk unnamed-chunk-5

Cellular Potts Simulations

For cellular systems, a model called the Cellular Potts Model (CPM) is used to simulate growth, differentiation, coarsening, phase changes, and a number of similar properties for complex systems. Implementing such algorithms has traditionally been very tedious requiring thousands of lines of optimized MPI code (which is difficult to program well and in a stable manner). For real imaging data, the number of elements in these simulations can exceed 14 billion elements with 378 billion edges run over thousands of iterations

\[ E = \frac{1}{2} \sum_i\sum_{\langle j\rangle_i} J[1 - \delta(S_i - S_j)] \]

Thomas, G., de Almeida, R., & Graner, F. (2006). Coarsening of three-dimensional grains in crystals, or bubbles in dry foams, tends towards a universal, statistically scale-invariant regime. Physical Review E, 74(2), 021407. doi:10.1103/PhysRevE.74.021407


gpotts<-graph.adjacency(c.mat,mode="undirected")
V(gpotts)$degree <- degree(gpotts)
V(gpotts)$label <- V(gpotts)$name
V(gpotts)$color <- "lightblue"
V(gpotts)$size<-35

E(gpotts)$width<-2
E(gpotts)$color<-"black"

sph<-function(gpotts,ind,color="orange",name="tumor") {
  V(gpotts)[ind]$color<-color
  V(gpotts)[ind]$label<-name
  gpotts
} 
for(i in seq(1,nrow(c.mat),1)) gpotts<-sph(gpotts,i,color="green",name="healthy")
for(i in seq(1,xsize+ysize/2)) gpotts<-sph(gpotts,i)

plot(gpotts,  layout=as.matrix(grid.df),vertex.shape="csquare")
plot of chunk unnamed-chunk-6

for(i in seq(1,nrow(c.mat),1)) gpotts<-sph(gpotts,i,color="green",name="healthy")
for(i in seq(1,2*xsize+ysize/2)) gpotts<-sph(gpotts,i)
for(i in seq(1,xsize/2)) gpotts<-sph(gpotts,i,color="grey20",name="necrotic")
plot(gpotts,  layout=as.matrix(grid.df),vertex.shape="csquare")
plot of chunk unnamed-chunk-7

Beyond: Approximate Results

Extensions of Spark out of AMPLab like BlinkDB are moving towards approximate results.

For real-time image processing, with tasks like component labeling and filtering it could work well

It provides a much more robust solution than taking a small region of interest or scaling down

Beyond: Improving Performance

Scala code can be slow, but it is very high level and can be automatically translated to CPU or GPU code with projects like ScalaCL

val myData = new Array(1,2,3,4,5)
val doubleSum = myData.map(_*2).reduce(_+_)

Using ScalaCL to run on GPUs with 2-4X performance improvement

val clData = myData.cl
val doubleSum = clData.map(_*2).reduce(_+_)

Pipe

Spark Imaging Layer is fully compatible with the notion of 'pipes' replacing pieces of code with other languages and tools.

We can also automatically choose among (and validate) different tools for the same analysis for increasing performance. Running on your laptop means you can profile code and see exactly where the bottlenecks are \(\rightarrow\) Premature Optimization is the source of all evil - Donald Knuth

Reality Check


But

We have a cool tool, but what does this mean for me?

A spinoff - 4Quant: From images to insight


Education / Training

Acknowledgements


We are interested in partnerships and collaborations

Learn more at

Using Amazon's Cloud

We have prepared an introduction to running Spark on the Amazon Cloud: http://4quant.github.io/aws-spark-introduction/

./spark-ec2 -k spark-key -i ~/Downloads/spark-key.pem -s 60 launch big-data-test-cluster --region=ap-southeast-2

Hadoop Filesystem (HDFS not HDF5)

Bottleneck is filesystem connection, many nodes (10+) reading in parallel brings even GPFS-based fiber system to a crawl

SIL


One of the central tenants of MapReduce(tm) is data-centric computation \(\rightarrow\) instead of data to computation, move the computation to the data.

SIL

Hyperspectral Imaging

Hyperspectral imaging is a rapidly growing area with the potentially for massive datasets and a severe deficit of usuable tools.

hyper.path<-function(filename) paste("/Users/mader/Dropbox/WorkRelated/SpectralImaging/",filename,sep="")
map.img<-t(readJpeg(hyper.path("raw.jpg"))[,,3]+readJpeg(hyper.path("raw.jpg"))[,,1])#,1,rev)
map.df<-im.to.df(map.img)
impos<-read.table(hyper.path("impos.csv"),sep=",")
names(impos)<-c("x","y")
ggplot(map.df,aes(x,y))+
  geom_raster(aes(fill=val))+
  #geom_tile(data=impos,fill="red",alpha=0.5)+
  geom_point(data=impos,alpha=0.7)+
  geom_point(data=subset(impos, abs(x-252)<20 & abs(y-293)<20),color="red")+
  labs(fill="White Field\nIntensity")+
  xlim(min(impos$x),max(impos$x))+
  ylim(min(impos$y),max(impos$y))+
  coord_equal()+
  theme_bw(20)#,aes(fill="Scanned"))
plot of chunk load_hypermap


The scale of the data is large and standard image processing tools are ill-suited for handling them, although the ideas used in image processing are equally applicable to hyperspectral data (filtering, thresholding, segmentation,...) and distributed, parallel approaches make even more sense on such massive datasets

full.df<-read.csv("/Users/mader/Dropbox/WorkRelated/SpectralImaging/full_img.csv")

full.df<-subset(full.df,wavenum<2500)
ggplot(
  subset(full.df, abs(x-252)<20 & abs(y-293)<20),aes(wavenum,val)
  )+
  geom_line()+
  facet_grid(x~y)+
  labs(x=expression(paste("Wavenumber (",cm^-1,")",sep="")),y="Intensity (au)")+
  theme_bw(10)
plot of chunk unnamed-chunk-8

With SIL

Using SIL, Hyperspectral images are treated the same way as normal images and all of the operations you apply an a normal image are applicable, as long as the underyling operations are defined. A Gaussian filter might seem like a strange operation at first, but using information from each of a points neighbors to reduce noise makes sense.


Identify cells from background by K-Means (2 groups) and then count the cells

clusters = KMeans.train(specImage,2)
labeledCells = specImage.map(clusters.identify(_)).
  makeLabels
print labeledCells.distinct.count+" cells"

K-Means Optimized (MPI/CUDA)

Taken from Serban's K-Means CUDA Project (https://github.com/serban/kmeans/blob/master/cuda_kmeans.cu)

checkCuda(cudaMalloc(&deviceObjects, numObjs*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceClusters, numClusters*numCoords*sizeof(float)));
checkCuda(cudaMalloc(&deviceMembership, numObjs*sizeof(int)));
checkCuda(cudaMalloc(&deviceIntermediates, numReductionThreads*sizeof(unsigned int)));
checkCuda(cudaMemcpy(deviceObjects, dimObjects[0],
  numObjs*numCoords*sizeof(float), cudaMemcpyHostToDevice));
checkCuda(cudaMemcpy(deviceMembership, membership,
  numObjs*sizeof(int), cudaMemcpyHostToDevice));
do {
  checkCuda(cudaMemcpy(deviceClusters, dimClusters[0],
                  numClusters*numCoords*sizeof(float), cudaMemcpyHostToDevice));
  find_nearest_cluster
            <<< numClusterBlocks, numThreadsPerClusterBlock, clusterBlockSharedDataSize >>>
            (numCoords, numObjs, numClusters,
             deviceObjects, deviceClusters, deviceMembership, deviceIntermediates);
  cudaDeviceSynchronize(); checkLastCudaError();
        compute_delta <<< 1, numReductionThreads, reductionBlockSharedDataSize >>>
            (deviceIntermediates, numClusterBlocks, numReductionThreads);
        cudaDeviceSynchronize(); checkLastCudaError();
        int d;
        checkCuda(cudaMemcpy(&d, deviceIntermediates,
                  sizeof(int), cudaMemcpyDeviceToHost));
        delta = (float)d;
        checkCuda(cudaMemcpy(membership, deviceMembership,
                  numObjs*sizeof(int), cudaMemcpyDeviceToHost));
        for (i=0; i<numObjs; i++) {
            /* find the array index of nestest cluster center */
            index = membership[i];

            /* update new cluster centers : sum of objects located within */
            newClusterSize[index]++;
            for (j=0; j<numCoords; j++)
                newClusters[j][index] += objects[i][j];
        }

K-Means Declarative

  1. Starting list of points: Points
  2. Take 4 random points from the list \(\rightarrow\) Centers
  3. Create a function called nearest center which takes a point \(\vec{x}\) and returns the nearest center to the point and the point itself
nearestCenter(x) = {
  for cCenter in Centers
    pointDist(cCenter) = dist(x,cCenter)
  end
  return (argmin(pointDist),x)
}
  1. Map nearestCenter onto Points as NearestCenterList

Getting an image to Key-Value Format

library(jpeg)
in.img<-readJPEG("ext-figures/input_image.jpg")
kv.img<-im.to.df(in.img)
write.table(kv.img,"cell_colony.csv",row.names=F,col.names=F,sep=",")
kable(head(kv.img))
x y val
1 1 0.6274510
2 1 0.7803922
3 1 0.8862745
4 1 0.8980392
5 1 0.9098039
6 1 0.9215686

The key is position \(\langle x, y \rangle\) and value is the intensity \(val\)

Loading the data into Spark (Scala)

val rawImage=sc.textFile("cell_colony.csv")
val imgAsColumns=rawImage.map(_.split(","))
val imgAsKV=imgAsColumns.map(point => ((point(0).toInt,point(1).toInt),point(2).toDouble))
imgAsKV.count
imgAsKV.take(1)
imgAsKV.sample(true,0.1,0).collect

Perform a threshold

val threshVal=0.5
val labelImg=imgAsKV.filter(_._2<threshVal)

Get Volume Fraction

100.0*labelImg.count/(imgAsKV.count)

Region of Interest

Take a region of interest between 0 and 100 in X and Y

def roiFun(pvec: ((Int,Int),Double)) = 
 {pvec._1._1>=0 & pvec._1._1<100 & // X
  pvec._1._2>=0 & pvec._1._2<100 } //Y
val roiImg=imgAsKV.filter(roiFun)

Perform a 3x3 box filter

def spread_voxels(pvec: ((Int,Int),Double), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val scalevalue=pvec._2/(wind.length*wind.length)
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),scalevalue)
}

val filtImg=roiImg.
      flatMap(cvec => spread_voxels(cvec)).
      filter(roiFun).reduceByKey(_ + _)

Setting up Component Labeling

val xWidth=100
var newLabels=labelImg.map(pvec => (pvec._1,(pvec._1._1.toLong*xWidth+pvec._1._2+1,true)))
def spread_voxels(pvec: ((Int,Int),(Long,Boolean)), windSize: Int = 1) = {
  val wind=(-windSize to windSize)
  val pos=pvec._1
  val label=pvec._2._1
  for(x<-wind; y<-wind) 
    yield ((pos._1+x,pos._2+y),(label,(x==0 & y==0)))
}

Running Component Labeling

var groupList=Array((0L,0))
var running=true
var iterations=0
while (running) {
  newLabels=newLabels.
  flatMap(spread_voxels(_,1)).
    reduceByKey((a,b) => ((math.min(a._1,b._1),a._2 | b._2))).
    filter(_._2._2)
  // make a list of each label and how many voxels are in it
  val curGroupList=newLabels.map(pvec => (pvec._2._1,1)).
    reduceByKey(_ + _).sortByKey(true).collect
  // if the list isn't the same as before, continue running since we need to wait for swaps to stop
  running = (curGroupList.deep!=groupList.deep)
  groupList=curGroupList
  iterations+=1
  print("Iter #"+iterations+":"+groupList.mkString(","))
}
groupList

Calculating From Images

val labelSize = newLabels.
  map(pvec => (pvec._2._1,1)).
  reduceByKey((a,b) => (a+b)).
  map(_._2)
labelSize.reduce((a,b) => (a+b))*1.0/labelSize.count
val labelPositions = newLabels.
  map(pvec => (pvec._2._1,pvec._1)).
  groupBy(_._1)
def posAvg(pvec: Seq[(Long,(Int,Int))]): (Double,Double) = {
val sumPt=pvec.map(_._2).reduce((a,b) => (a._1+b._1,a._2+b._2))
(sumPt._1*1.0/pvec.length,sumPt._2*1.0/pvec.length)
}
print(labelPositions.map(pvec=>posAvg(pvec._2)).mkString(","))

Lazy evaluation

Applying Machine Learning to Imaging

The needs in advertising, marketing, and recommendation engines have powered the creation of a many new machine learning tools capable of processing huge volumes of data. One of the most actively developed projects, MLLib, is a module of Spark and can be directly combined with the Imaging Layer